iT邦幫忙

2021 iThome 鐵人賽

DAY 22
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 22

卡夫卡的藏書閣【Book22】- Kafka - KafkaJS 消費者 4

  • 分享至 

  • xImage
  •  

“Love is, that you are the knife which I plunge into myself.”
― Kafka, Franzv


重新設定偏移量 ( Seek )

消費者可以重新設定對於某個主題/分區的偏移量,只要使用方法 seek,這個方法可以在消費者被初始化且開始運作後被呼叫

await consumer.connect()
await consumer.subscribe({ topic: 'example' })

consumer.run({ eachMessage: async ({ topic, message }) => true })
consumer.seek({ topic: 'example', partition: 0, offset: 12384 })

如果在生效的批次中重新設定偏移量,會將該批次的訊息都標註為陳舊訊息並且棄用,這是為了去確保下一個消費的訊息是從重新設定的偏移量開始消費,所以在使用 eachBatch 方法時,請記得要用 isStale() 檢查訊息是否是陳舊訊息
預設的情況下生產者會自動提交重新設定的偏移量,將生產者的自動提交(autoCommit)參數關閉可以避免這個行為

consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, message }) => true
})

consumer.seek({ topic: 'example', partition: 0, offset: "12384" })

客製化分區器 (Custom partition assigner)

KafkaJS 預設是按照順序( round robin )去分配分區的,但其實可以自己客製化分區的邏輯給消費者群組使用
一個分區器是一個會回傳帶有介面的物件的方法,以下為範例

const MyPartitionAssigner = ({ cluster }) => ({
    name: 'MyPartitionAssigner',
    version: 1,
    async assign({ members, topics }) {},
    protocol({ topics }) {}
})

這方法必須針對每個主題的分區去回傳分派計畫,分派計畫是一個由 memberId 列表和 memberAssignment 組成的,memberAssignment 必須用 MemberAssignment 加密,範例如下

const { AssignerProtocol: { MemberAssignment } } = require('kafkajs')

const MyPartitionAssigner = ({ cluster }) => ({
    version: 1,
    async assign({ members, topics }) {
        // perform assignment
        return myCustomAssignmentArray.map(memberId => ({
            memberId,
            memberAssignment: MemberAssignment.encode({
                version: this.version,
                assignment: assignment[memberId],
            })
        }))
    }
})

方法 protocol 要回傳 namemetadatametadata 必須要用 MemberMetadata 去加密,以下為範例

const { AssignerProtocol: { MemberMetadata } } = require('kafkajs')

const MyPartitionAssigner = ({ cluster }) => ({
    name: 'MyPartitionAssigner',
    version: 1,
    protocol({ topics }) {
        return {
            name: this.name,
            metadata: MemberMetadata.encode({
            version: this.version,
            topics,
            }),
        }
    }
})

assigner 完成後將它加到分配器的清單中,這邊的重點是要記得要設定預設的分配器去讓舊的消費者可以使用

const { PartitionAssigners: { roundRobin } } = require('kafkajs')

kafka.consumer({
    groupId: 'my-group',
    partitionAssigners: [
        MyPartitionAssigner,
        roundRobin
    ]
})

Describe group

這個參數是新加入、實驗階段的參數,未來可能會被移除或是修改掉,這個參數會回傳消費者群組的元資料

const data = await consumer.describeGroup()
// {
//  errorCode: 0,
//  groupId: 'consumer-group-id-f104efb0e1044702e5f6',
//  members: [
//    {
//      clientHost: '/172.19.0.1',
//      clientId: 'test-3e93246fe1f4efa7380a',
//      memberAssignment: Buffer,
//      memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb',
//      memberMetadata: Buffer,
//    },
//  ],
//  protocol: 'RoundRobinAssigner',
//  protocolType: 'consumer',
//  state: 'Stable',
// },

上一篇
卡夫卡的藏書閣【Book21】- Kafka - KafkaJS 消費者 3
下一篇
卡夫卡的藏書閣【Book23】- Kafka - KafkaJS 監控狀態事件
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言